/*
 * MIT License
 *
 * Copyright (c) 2020 wen.gu <454727014@qq.com>
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in all
 * copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
 * SOFTWARE.
 */

 /***************************************************************************
 * Name: registration_center_client.cpp
 *
 * Purpose: registration center client side default implementation
 *
 * Developer:
 *   wen.gu , 2021-09-07
 *
 * TODO:
 *
 ***************************************************************************/

 /******************************************************************************
 **    INCLUDES
 ******************************************************************************/
#include "icpp/com/registration_center_client.h"

#include "icpp/com/message_broker_simple.h"
#include "icpp/com/proxy_method.h"
#include "icpp/com/proxy_event.h"
#include "icpp/executor/timer.h"
#include "icpp/com/response_builder.h"

#define LOG_TAG "rgcc"
#include "icpp/core/log.h"

namespace icpp
{
namespace com
{
/******************************************************************************
 **    MACROS
 ******************************************************************************/
#define REGISTRATION_CENTER_URL "ipc:///temp/com.registration_center"
#define REGISTRATION_CENTER_INTERFACE_VERSION 0
/******************************************************************************
 **    VARIABLE DEFINITIONS
 ******************************************************************************/

class RegistrationCenterClient::Impl
{
public:
    using MessageBrokerPtr = MessageBroker::MessageBrokerPtr;
    using Timer = executor::Timer;
    using ReportorMap = std::map<std::string, Timer>;
    
    /** uint32_t: listener id */
    using ServiceListenerMap = std::map<uint32_t, ServiceActivityStatusListener>;
    using ServiceActivityStatusHandlerMap = std::map<std::string, ServiceListenerMap>;

    /** uint32_t: handler id */
    using OnlineHandlerMap = std::map<uint32_t, RegistrationCenterOnlineHandler>;
public:
    bool is_reg_center_online = false;
    MessageBrokerPtr msg_broker_ = nullptr;
    ReportorMap repotors_;
    ServiceActivityStatusHandlerMap srv_handlers_;
    OnlineHandlerMap online_handlers_;
    std::mutex lock_;
public:
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kAllocUrls, UrlList, const std::string&> alloc_urls;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kRegisterService, void, const ServiceInfo&> register_service;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kUnregisterService, void, const std::string&> unregister_service;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kGetServiceInfo, ServiceInfo, const std::string&> get_service_info;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kIsServiceExist, bool, const std::string&> is_service_exist;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kReportAlive, void, const std::string&> report_alive;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kGetServiceNameList, ServiceNameArray, void> get_service_name_list;
    ProxyMethod<MessageBrokerPtr, (MessageId)RegistrationCenterMethod::kGetServiceNameList, ServiceInfoArray, void> get_service_list;

    ProxyEvent<MessageBrokerPtr, (MessageId)RegistrationCenterEvent::kServiceActivityStatusChanged, ServiceActivityStatusInfo> service_listener;
public:
    Impl();
    ~Impl();
};

class MyServiceFinder: public ServiceFinder
{
public:
    using IcppErrc = core::IcppErrc;
public:
    MyServiceFinder(const std::string& srv_name);
    ~MyServiceFinder();
public:
    /**
     * try to find a service instance, and return result immediately.
    */
    Result<ServiceInfo> tryFindService() override;

    /**
     * start to find a service instance and wait unitil timeout or successfull find a valid service instance
     */
    Result<ServiceInfo> findService(uint32_t wait_ms = FIND_SERVICE_WAIT_INFINITE) override;

    /**
     * cancel find service  when 'findService' method called and in waiting state. 
     */
    Result<void> cancelFindService() override;

    /**
     * start to find a service with async mode, the result will return by callback  'handler'
     */
    Result<void> startFindServiceAsync(OnFindServiceHandler handler) override;

     /**
     * stop to find a service with async mode
     */   
    Result<void> stopFindServiceAsync() override;

private:
    bool is_wait_ = false;
    uint32_t listener_id_for_wait_ = INVALID_LISTENER_ID;
    uint32_t listener_id_for_async_ = INVALID_LISTENER_ID;
    std::string srv_name_;
    std::mutex lock_;
    ResponseBuilder<ServiceInfo>::ResponseBuilderPtr res_builder_for_wait_ = nullptr;
    //RegistrationCenterClient reg_center_;
};

/******************************************************************************
 **   inner FUNCTION DEFINITIONS
 ******************************************************************************/

static Result<MessageBroker::MessageBrokerPtr> InitializeMessageBrokerAsClient(const ServiceInfo& srv_info, MessageBrokerConnectionStateHandler state_handler)
{

    MessageBrokerInitializeParam param;
    param.broker_name = "cli: " + srv_info.service_name;
    param.protocol_version = COM_PROTOCOL_VERSION;
    param.interface_version = srv_info.interface_version;
    param.connection_state_handler = state_handler;
    MessageBroker::MessageBrokerPtr msg_broker = std::make_shared<MessageBrokerSimple>(MessageBrokerRole::kClient);

    core::IcppErrc ret = msg_broker->initialize(srv_info.urls, param);

    if (core::IcppErrc::OK != ret)
    {
        LOGE("initialize message broker(%s) as 'client' failed(%s)\n",param.broker_name.c_str(), core::ErrorStr(ret));
        return ret;
    }

    ret = msg_broker->start();

    if (core::IcppErrc::OK != ret)
    {
        LOGE("message broker(%s) as 'client' connect to failed(%s)\n", param.broker_name.c_str(), core::ErrorStr(ret));
        msg_broker->uninitialize();
        return ret;
    }  

    return msg_broker;
}


/******************************************************************************
 **    FUNCTION DEFINITIONS
 ******************************************************************************/

RegistrationCenterClient::Impl::Impl()
    :msg_broker_(std::make_shared<MessageBrokerSimple>(MessageBrokerRole::kClient)),
    alloc_urls(msg_broker_),
    register_service(msg_broker_),
    unregister_service(msg_broker_),
    get_service_info(msg_broker_),
    is_service_exist(msg_broker_),
    report_alive(msg_broker_),
    service_listener(msg_broker_),
    get_service_name_list(msg_broker_),
    get_service_list(msg_broker_)
{
    /** todo something */
    std::string reg_center_url = REGISTRATION_CENTER_URL;
    UrlList urls;
    urls.push_back(reg_center_url);
    MessageBrokerInitializeParam param;
    param.broker_name = "cli: " + reg_center_url;
    param.interface_version = REGISTRATION_CENTER_INTERFACE_VERSION;
    param.protocol_version = COM_PROTOCOL_VERSION;
    param.connection_state_handler = [this](EndpointId service_id, bool is_connected)
    {
        core::AutoLock al(this->lock_);
        if (is_connected)
        {
            
            for (auto it: this->online_handlers_)
            {
                if (it.second)
                {
                    it.second(is_connected);
                }
            }            
        }

        this->is_reg_center_online = is_connected;
    };

    msg_broker_->initialize(urls, param);

    /** todo , add auto reconnect to registration center service */
}

RegistrationCenterClient::Impl::~Impl()
{
    /** todo something */
    core::AutoLock al(lock_);
    service_listener.unsubscribe();
    service_listener.setReceiveHandler(nullptr);
}

///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////
MyServiceFinder::MyServiceFinder(const std::string& srv_name)
    :srv_name_(srv_name)
{
    /** todo something */
}

MyServiceFinder::~MyServiceFinder()
{
    /** todo something */
}

/**
 * try to find a service instance, and return result immediately.
*/
Result<ServiceInfo> MyServiceFinder::tryFindService() 
{
    Response<ServiceInfo> res = RegistrationCenterClient::getInstance().getServiceInfo(srv_name_);
    return res.getResult();  /** todo refineme this maybe dead block */
}

/**
 * start to find a service instance and wait unitil timeout or successfull find a valid service instance
 */
Result<ServiceInfo> MyServiceFinder::findService(uint32_t wait_ms /*= FIND_SERVICE_WAIT_INFINITE*/)
{
    core::AutoLock al(lock_);

    if (is_wait_)
    {
        return Result<ServiceInfo>::FromError(IcppErrc::InvalidStatus);        
    }

    is_wait_ = true;
    res_builder_for_wait_ = std::make_shared< ResponseBuilder<ServiceInfo> >();
    ResponseBuilder<ServiceInfo>::ResponseBuilderPtr res_buidler_ptr = res_builder_for_wait_;
    ResponseBuilder<ServiceInfo>* res_buidler = res_buidler_ptr.get();  

    Response<uint32_t> listener_res = RegistrationCenterClient::getInstance().addListener(srv_name_, [res_buidler_ptr](ServiceActivityStatus status, const ServiceInfo& srv_info)
    {
        if (status == ServiceActivityStatus::kOnline)
        {
            res_buidler_ptr->setValue(srv_info);
        }        
    });

    Result<uint32_t> listener_rt = listener_res.getResult(); /** todo, maybe dead block?? */

    if (listener_rt.hasValue() == false)
    {
        return Result<ServiceInfo>::FromError(listener_rt.error());   
    }

    listener_id_for_wait_ = listener_rt.value();

    Response<ServiceInfo> res = res_buidler->getResponse();

    if (wait_ms <= GET_SERVICE_WAIT_INFINITE)
    {
        res.wait();
        is_wait_ = false;
        return res.getResult();
    }

    if (res.waitFor(std::chrono::milliseconds(wait_ms)) == ResponseStatus::kReady)
    {
        is_wait_ = false;
        return res.getResult();
    }

    is_wait_ = false;
    return Result<ServiceInfo>::FromError(IcppErrc::Timeout);
}

/**
 * cancel find service  when 'findService' method called and in waiting state. 
 */
Result<void> MyServiceFinder::cancelFindService() 
{
    core::AutoLock al(lock_);

    if (is_wait_ == false)
    {
        return Result<void>::FromError(IcppErrc::InvalidStatus);        
    }    

    RegistrationCenterClient::getInstance().removeListener(srv_name_, listener_id_for_wait_);
    res_builder_for_wait_->setError(IcppErrc::OperationCanceled);
    listener_id_for_wait_ = INVALID_LISTENER_ID;
    is_wait_ = false;

    return Result<void>::FromValue();
}

/**
 * start to find a service with async mode, the result will return by callback  'handler'
 */
Result<void> MyServiceFinder::startFindServiceAsync(OnFindServiceHandler handler) 
{
    if (handler == nullptr)
    {
        return Result<void>::FromError(IcppErrc::BadParameter);
    }

    core::AutoLock al(lock_);

    if (listener_id_for_async_ != INVALID_LISTENER_ID)
    {
        return Result<void>::FromError(IcppErrc::InvalidStatus);
    }

    Response<uint32_t> listener_res = RegistrationCenterClient::getInstance().addListener(srv_name_, [handler](ServiceActivityStatus status, const ServiceInfo& srv_info)
    {
        if (status == ServiceActivityStatus::kOnline)
        {
            handler(srv_info);
        }
    });

    Result<uint32_t> listener_rt = listener_res.getResult(); /** todo, maybe dead block?? */

    if (listener_rt.hasValue() == false)
    {
        return Result<void>::FromError(listener_rt.error());   
    }

    listener_id_for_async_ = listener_rt.value();    

    return Result<void>::FromValue();
}

/**
 * stop to find a service with async mode
 */   
Result<void> MyServiceFinder::stopFindServiceAsync() 
{
    core::AutoLock al(lock_);

    if (listener_id_for_async_ == INVALID_LISTENER_ID)
    {
        return Result<void>::FromError(IcppErrc::InvalidStatus);
    }

    Response<void> listener_res = RegistrationCenterClient::getInstance().removeListener(srv_name_, listener_id_for_async_);

    listener_id_for_async_ = INVALID_LISTENER_ID;

    return listener_res.getResult(); /** todo, maybe dead block?? */
}

///////////////////////////////////////////////////////////////////////////////
///////////////////////////////////////////////////////////////////////////////

RegistrationCenterClient::RegistrationCenterClient()
    :impl_(new Impl)
{
    /** todo something */
}

RegistrationCenterClient::~RegistrationCenterClient()
{
    /** todo, something */
}


/** for service */
Response<UrlList> RegistrationCenterClient::allocUrls(const std::string& service_name) /** allocate adress for service  by registration center */
{
    return impl_->alloc_urls(service_name);
}

//IcppErrc 
Response<void> RegistrationCenterClient::registerService(const ServiceInfo& param)
{
    return impl_->register_service(param);
}

Response<void> RegistrationCenterClient::unregisterService(const std::string& service_name)
{
    return impl_->unregister_service(service_name);
}

/** for client */
Response<ServiceInfo> RegistrationCenterClient::getServiceInfo(const std::string& service_anme)
{
    return impl_->get_service_info(service_anme);
}

Response<bool> RegistrationCenterClient::isServiceExist(const std::string& service_name)
{
    return impl_->is_service_exist(service_name);
}

Response<RegistrationCenterClient::ServiceNameArray> RegistrationCenterClient::getServiceNameList()
{
    return impl_->get_service_name_list();
}

Response<RegistrationCenterClient::ServiceInfoArray> RegistrationCenterClient::getServiceList()
{
    return impl_->get_service_list();
}

Response<uint32_t> RegistrationCenterClient::addListener(const std::string& service_name, ServiceActivityStatusListener listener)
{
    core::AutoLock al(impl_->lock_);
    uint32_t listener_id = impl_->srv_handlers_[service_name].size();
    impl_->srv_handlers_[service_name][listener_id] = listener;

    if (impl_->service_listener.getSubscriptionState() == SubscriptionState::kNotSubscribed)
    {
        impl_->service_listener.subscribe();
        impl_->service_listener.setReceiveHandler([this](std::unique_ptr<const ServiceActivityStatusInfo>&& srv_info)
        {
            if (srv_info->status != ServiceActivityStatus::kOnline)
            {
                LOGE("the service(%s) not online\n", srv_info->srv_info.service_name.c_str());
                return ;
            }

            core::AutoLock al(this->impl_->lock_);
            Impl::ServiceActivityStatusHandlerMap::iterator it = this->impl_->srv_handlers_.find(srv_info->srv_info.service_name);
            if (it != this->impl_->srv_handlers_.end())
            {
                Impl::ServiceListenerMap& listener_map = it->second;
                for (auto& handler : listener_map)
                {
                    handler.second(srv_info->status, srv_info->srv_info);
                }
            }
        });
    }

    ResponseBuilder<uint32_t> res;
    res.setValue(listener_id);
  
    return res.getResponse();
}

Response<void> RegistrationCenterClient::removeListener(const std::string& service_name, uint32_t listener_id)
{
    ResponseBuilder<void> res;
    if (service_name.empty() || (INVALID_LISTENER_ID == listener_id))
    {
        res.setError(IcppErrc::BadParameter);
        return res.getResponse();
    }

    core::AutoLock al(impl_->lock_);
    
    Impl::ServiceActivityStatusHandlerMap::iterator it = impl_->srv_handlers_.find(service_name);    

    if (it != impl_->srv_handlers_.end())
    {
        Impl::ServiceListenerMap& listener_map = it->second;
        Impl::ServiceListenerMap::iterator lsit = listener_map.find(listener_id);

        if (lsit != listener_map.end())
        {
            listener_map.erase(lsit);

            if (listener_map.empty())
            {
                impl_->srv_handlers_.erase(it);

                if (impl_->srv_handlers_.empty())
                {
                    impl_->service_listener.unsubscribe();
                    impl_->service_listener.setReceiveHandler(nullptr);
                }
            }

            res.setValue();
        }
        else
        {
            res.setError(IcppErrc::NotFound);
        }
    }
    else
    {
        res.setError(IcppErrc::NotFound);
    }
    
    return res.getResponse();
}

/** for other */
Response<void> RegistrationCenterClient::reportAlive(const std::string& service_name)  /** report this registration center client is alive(maybe in service side or client side) */
{
    return impl_->report_alive(service_name);
}

Response<void> RegistrationCenterClient::startAutoReportAlive(const std::string& service_name, uint32_t interval_ms /*= 1000*/) /** default report alive status interval is 1 sec */
{
    
     Impl::Timer& timer = impl_->repotors_[service_name];
     timer.set_periodic_interval(interval_ms);
     timer.start([this, service_name]()
     {
         this->reportAlive(service_name);
     });

     return ResponseBuilder<void>().getResponse();
}

Response<void> RegistrationCenterClient::stopAutoReportAlive(const std::string& service_name)
{
    Impl::ReportorMap::iterator it = impl_->repotors_.find(service_name);
    ResponseBuilder<void> res_builder;

    if (it != impl_->repotors_.end())
    {
        Impl::Timer& timer = it->second;
        timer.stop();
        impl_->repotors_.erase(it);
    }
    else
    {
        res_builder.setError(IcppErrc::NotFound);
    }

    return res_builder.getResponse();
}

/** if success will return a id, which can be as param for remove online handler method */
Response<uint32_t> RegistrationCenterClient::addOnlineHandler(RegistrationCenterOnlineHandler handler)
{
    ResponseBuilder<uint32_t> res_builder;
    if (handler == nullptr)
    {
        res_builder.setError(IcppErrc::BadParameter);
        return res_builder.getResponse();
    }

    core::AutoLock al(impl_->lock_);    

    uint32_t handler_id = impl_->online_handlers_.size();
    impl_->online_handlers_[handler_id] = handler;

    if (impl_->is_reg_center_online)
    {
        handler(impl_->is_reg_center_online);
    }    
    
    res_builder.setValue(handler_id);

    return res_builder.getResponse();
}

Response<void>   RegistrationCenterClient::removeOnlineHandler(uint32_t handler_id)
{
    ResponseBuilder<void> res_builder;
    core::AutoLock al(impl_->lock_);  

    Impl::OnlineHandlerMap::iterator it = impl_->online_handlers_.find(handler_id);

    if (it != impl_->online_handlers_.end())
    {
        res_builder.setValue();
        impl_->online_handlers_.erase(it);
    }
    else
    {
        res_builder.setError(IcppErrc::NotFound);
    }

    return res_builder.getResponse();
}

//static 
RegistrationCenterClient::ServiceFinderPtr RegistrationCenterClient::getServiceFinder(const std::string& service_name)
{
    return std::make_shared<MyServiceFinder>(service_name);
}

//static 
RegistrationCenterClient& RegistrationCenterClient::getInstance()
{
    static RegistrationCenterClient reg_center;

    return reg_center;
}

} /** namespace com */
} /** namespace icpp */